//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "wal_handler.h"

#include <ctime>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/log_writer.h"
#include "db/version_set.h"
#include "env/mock_env.h"
#include "pure_mem/std_out_logger.h"
#include "rocksdb/cache.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"

#define NUM_THREADS 20

namespace rocksdb {

namespace anon {
class AtomicCounter {
 public:
  explicit AtomicCounter(Env* env = NULL)
      : env_(env), cond_count_(&mu_), count_(0) {}

  void Increment() {
    MutexLock l(&mu_);
    count_++;
    cond_count_.SignalAll();
  }

  int Read() {
    MutexLock l(&mu_);
    return count_;
  }

  bool WaitFor(int count) {
    MutexLock l(&mu_);

    uint64_t start = env_->NowMicros();
    while (count_ < count) {
      uint64_t now = env_->NowMicros();
      cond_count_.TimedWait(now + /*1s*/ 1 * 1000 * 1000);
      if (env_->NowMicros() - start > /*10s*/ 10 * 1000 * 1000) {
        return false;
      }
      if (count_ < count) {
        GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual";
      }
    }

    return true;
  }

  void Reset() {
    MutexLock l(&mu_);
    count_ = 0;
    cond_count_.SignalAll();
  }

 private:
  Env* env_;
  port::Mutex mu_;
  port::CondVar cond_count_;
  int count_;
};

struct OptionsOverride {
  std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
  // These will be used only if filter_policy is set
  bool partition_filters = false;
  uint64_t metadata_block_size = 1024;

  // Used as a bit mask of individual enums in which to skip an XF test point
  int skip_policy = 0;
};

}  // namespace anon

// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
 public:
  explicit SpecialEnv(Env* base);

  Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
                         const EnvOptions& soptions) override {
    class SSTableFile : public WritableFile {
     private:
      SpecialEnv* env_;
      std::unique_ptr<WritableFile> base_;

     public:
      SSTableFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& base)
          : env_(env), base_(std::move(base)) {}
      Status Append(const Slice& data) override {
        if (env_->table_write_callback_) {
          (*env_->table_write_callback_)();
        }
        if (env_->drop_writes_.load(std::memory_order_acquire)) {
          // Drop writes on the floor
          return Status::OK();
        } else if (env_->no_space_.load(std::memory_order_acquire)) {
          return Status::NoSpace("No space left on device");
        } else {
          env_->bytes_written_ += data.size();
          return base_->Append(data);
        }
      }
      Status PositionedAppend(const Slice& data, uint64_t offset) override {
        if (env_->table_write_callback_) {
          (*env_->table_write_callback_)();
        }
        if (env_->drop_writes_.load(std::memory_order_acquire)) {
          // Drop writes on the floor
          return Status::OK();
        } else if (env_->no_space_.load(std::memory_order_acquire)) {
          return Status::NoSpace("No space left on device");
        } else {
          env_->bytes_written_ += data.size();
          return base_->PositionedAppend(data, offset);
        }
      }
      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
      Status RangeSync(uint64_t offset, uint64_t nbytes) override {
        Status s = base_->RangeSync(offset, nbytes);
#if !(defined NDEBUG) || !defined(OS_WIN)
        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s);
#endif  // !(defined NDEBUG) || !defined(OS_WIN)
        return s;
      }
      Status Close() override {
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
        // Check preallocation size
        // preallocation size is never passed to base file.
        size_t preallocation_size = preallocation_block_size();
        TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
                                 &preallocation_size);
#endif  // !(defined NDEBUG) || !defined(OS_WIN)
        Status s = base_->Close();
#if !(defined NDEBUG) || !defined(OS_WIN)
        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s);
#endif  // !(defined NDEBUG) || !defined(OS_WIN)
        return s;
      }
      Status Flush() override { return base_->Flush(); }
      Status Sync() override {
        ++env_->sync_counter_;
        while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
          env_->SleepForMicroseconds(100000);
        }
        Status s = base_->Sync();
#if !(defined NDEBUG) || !defined(OS_WIN)
        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s);
#endif  // !(defined NDEBUG) || !defined(OS_WIN)
        return s;
      }
      void SetIOPriority(Env::IOPriority pri) override {
        base_->SetIOPriority(pri);
      }
      Env::IOPriority GetIOPriority() override {
        return base_->GetIOPriority();
      }
      bool use_direct_io() const override { return base_->use_direct_io(); }
      Status Allocate(uint64_t offset, uint64_t len) override {
        return base_->Allocate(offset, len);
      }
    };
    class ManifestFile : public WritableFile {
     public:
      ManifestFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
          : env_(env), base_(std::move(b)) {}
      Status Append(const Slice& data) override {
        if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
          return Status::IOError("simulated writer error");
        } else {
          return base_->Append(data);
        }
      }
      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
      Status Close() override { return base_->Close(); }
      Status Flush() override { return base_->Flush(); }
      Status Sync() override {
        ++env_->sync_counter_;
        if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
          return Status::IOError("simulated sync error");
        } else {
          return base_->Sync();
        }
      }
      uint64_t GetFileSize() override { return base_->GetFileSize(); }
      Status Allocate(uint64_t offset, uint64_t len) override {
        return base_->Allocate(offset, len);
      }

     private:
      SpecialEnv* env_;
      std::unique_ptr<WritableFile> base_;
    };
    class WalFile : public WritableFile {
     public:
      WalFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
          : env_(env), base_(std::move(b)) {
        env_->num_open_wal_file_.fetch_add(1);
      }
      virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
      Status Append(const Slice& data) override {
#if !(defined NDEBUG) || !defined(OS_WIN)
        TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
#endif
        Status s;
        if (env_->log_write_error_.load(std::memory_order_acquire)) {
          s = Status::IOError("simulated writer error");
        } else {
          int slowdown =
              env_->log_write_slowdown_.load(std::memory_order_acquire);
          if (slowdown > 0) {
            env_->SleepForMicroseconds(slowdown);
          }
          s = base_->Append(data);
        }
#if !(defined NDEBUG) || !defined(OS_WIN)
        TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
#endif
        return s;
      }
      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
      Status Close() override {
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
        // Check preallocation size
        // preallocation size is never passed to base file.
        size_t preallocation_size = preallocation_block_size();
        TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
                                 &preallocation_size);
#endif  // !(defined NDEBUG) || !defined(OS_WIN)

        return base_->Close();
      }
      Status Flush() override { return base_->Flush(); }
      Status Sync() override {
        ++env_->sync_counter_;
        return base_->Sync();
      }
      bool IsSyncThreadSafe() const override {
        return env_->is_wal_sync_thread_safe_.load();
      }
      Status Allocate(uint64_t offset, uint64_t len) override {
        return base_->Allocate(offset, len);
      }

     private:
      SpecialEnv* env_;
      std::unique_ptr<WritableFile> base_;
    };

    if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
      uint32_t random_number;
      {
        MutexLock l(&rnd_mutex_);
        random_number = rnd_.Uniform(100);
      }
      if (random_number < non_writeable_rate_.load()) {
        return Status::IOError("simulated random write error");
      }
    }

    new_writable_count_++;

    if (non_writable_count_.load() > 0) {
      non_writable_count_--;
      return Status::IOError("simulated write error");
    }

    EnvOptions optimized = soptions;
    if (strstr(f.c_str(), "MANIFEST") != nullptr ||
        strstr(f.c_str(), "log") != nullptr) {
      optimized.use_mmap_writes = false;
      optimized.use_direct_writes = false;
    }

    Status s = target()->NewWritableFile(f, r, optimized);
    if (s.ok()) {
      if (strstr(f.c_str(), ".sst") != nullptr) {
        r->reset(new SSTableFile(this, std::move(*r)));
      } else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
        r->reset(new ManifestFile(this, std::move(*r)));
      } else if (strstr(f.c_str(), "log") != nullptr) {
        r->reset(new WalFile(this, std::move(*r)));
      }
    }
    return s;
  }

  Status NewRandomAccessFile(const std::string& f,
                             std::unique_ptr<RandomAccessFile>* r,
                             const EnvOptions& soptions) override {
    class CountingFile : public RandomAccessFile {
     public:
      CountingFile(std::unique_ptr<RandomAccessFile>&& target,
                   anon::AtomicCounter* counter,
                   std::atomic<size_t>* bytes_read)
          : target_(std::move(target)),
            counter_(counter),
            bytes_read_(bytes_read) {}
      virtual Status Read(uint64_t offset, size_t n, Slice* result,
                          char* scratch) const override {
        counter_->Increment();
        Status s = target_->Read(offset, n, result, scratch);
        *bytes_read_ += result->size();
        return s;
      }

      virtual Status Prefetch(uint64_t offset, size_t n) override {
        Status s = target_->Prefetch(offset, n);
        *bytes_read_ += n;
        return s;
      }

     private:
      std::unique_ptr<RandomAccessFile> target_;
      anon::AtomicCounter* counter_;
      std::atomic<size_t>* bytes_read_;
    };

    Status s = target()->NewRandomAccessFile(f, r, soptions);
    random_file_open_counter_++;
    if (s.ok() && count_random_reads_) {
      r->reset(new CountingFile(std::move(*r), &random_read_counter_,
                                &random_read_bytes_counter_));
    }
    if (s.ok() && soptions.compaction_readahead_size > 0) {
      compaction_readahead_size_ = soptions.compaction_readahead_size;
    }
    return s;
  }

  virtual Status NewSequentialFile(const std::string& f,
                                   std::unique_ptr<SequentialFile>* r,
                                   const EnvOptions& soptions) override {
    class CountingFile : public SequentialFile {
     public:
      CountingFile(std::unique_ptr<SequentialFile>&& target,
                   anon::AtomicCounter* counter)
          : target_(std::move(target)), counter_(counter) {}
      virtual Status Read(size_t n, Slice* result, char* scratch) override {
        counter_->Increment();
        return target_->Read(n, result, scratch);
      }
      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }

     private:
      std::unique_ptr<SequentialFile> target_;
      anon::AtomicCounter* counter_;
    };

    Status s = target()->NewSequentialFile(f, r, soptions);
    if (s.ok() && count_sequential_reads_) {
      r->reset(new CountingFile(std::move(*r), &sequential_read_counter_));
    }
    return s;
  }

  virtual void SleepForMicroseconds(int micros) override {
    sleep_counter_.Increment();
    if (no_slowdown_ || time_elapse_only_sleep_) {
      addon_time_.fetch_add(micros);
    }
    if (!no_slowdown_) {
      target()->SleepForMicroseconds(micros);
    }
  }

  virtual Status GetCurrentTime(int64_t* unix_time) override {
    Status s;
    if (!time_elapse_only_sleep_) {
      s = target()->GetCurrentTime(unix_time);
    }
    if (s.ok()) {
      *unix_time += addon_time_.load();
    }
    return s;
  }

  virtual uint64_t NowCPUNanos() override {
    now_cpu_count_.fetch_add(1);
    return target()->NowCPUNanos();
  }

  virtual uint64_t NowNanos() override {
    return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) +
           addon_time_.load() * 1000;
  }

  virtual uint64_t NowMicros() override {
    return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) +
           addon_time_.load();
  }

  virtual Status DeleteFile(const std::string& fname) override {
    delete_count_.fetch_add(1);
    return target()->DeleteFile(fname);
  }

  Random rnd_;
  port::Mutex rnd_mutex_;  // Lock to pretect rnd_

  // sstable Sync() calls are blocked while this pointer is non-nullptr.
  std::atomic<bool> delay_sstable_sync_;

  // Drop writes on the floor while this pointer is non-nullptr.
  std::atomic<bool> drop_writes_;

  // Simulate no-space errors while this pointer is non-nullptr.
  std::atomic<bool> no_space_;

  // Simulate non-writable file system while this pointer is non-nullptr
  std::atomic<bool> non_writable_;

  // Force sync of manifest files to fail while this pointer is non-nullptr
  std::atomic<bool> manifest_sync_error_;

  // Force write to manifest files to fail while this pointer is non-nullptr
  std::atomic<bool> manifest_write_error_;

  // Force write to log files to fail while this pointer is non-nullptr
  std::atomic<bool> log_write_error_;

  // Slow down every log write, in micro-seconds.
  std::atomic<int> log_write_slowdown_;

  // Number of WAL files that are still open for write.
  std::atomic<int> num_open_wal_file_;

  bool count_random_reads_;
  anon::AtomicCounter random_read_counter_;
  std::atomic<size_t> random_read_bytes_counter_;
  std::atomic<int> random_file_open_counter_;

  bool count_sequential_reads_;
  anon::AtomicCounter sequential_read_counter_;

  anon::AtomicCounter sleep_counter_;

  std::atomic<int64_t> bytes_written_;

  std::atomic<int> sync_counter_;

  std::atomic<uint32_t> non_writeable_rate_;

  std::atomic<uint32_t> new_writable_count_;

  std::atomic<uint32_t> non_writable_count_;

  std::function<void()>* table_write_callback_;

  std::atomic<int64_t> addon_time_;

  std::atomic<int> now_cpu_count_;

  std::atomic<int> delete_count_;

  std::atomic<bool> time_elapse_only_sleep_;

  bool no_slowdown_;

  std::atomic<bool> is_wal_sync_thread_safe_{true};

  std::atomic<size_t> compaction_readahead_size_{};
};

SpecialEnv::SpecialEnv(Env* base)
    : EnvWrapper(base),
      rnd_(301),
      sleep_counter_(this),
      addon_time_(0),
      time_elapse_only_sleep_(false),
      no_slowdown_(false) {
  delay_sstable_sync_.store(false, std::memory_order_release);
  drop_writes_.store(false, std::memory_order_release);
  no_space_.store(false, std::memory_order_release);
  non_writable_.store(false, std::memory_order_release);
  count_random_reads_ = false;
  count_sequential_reads_ = false;
  manifest_sync_error_.store(false, std::memory_order_release);
  manifest_write_error_.store(false, std::memory_order_release);
  log_write_error_.store(false, std::memory_order_release);
  random_file_open_counter_.store(0, std::memory_order_relaxed);
  delete_count_.store(0, std::memory_order_relaxed);
  num_open_wal_file_.store(0);
  log_write_slowdown_ = 0;
  bytes_written_ = 0;
  sync_counter_ = 0;
  non_writeable_rate_ = 0;
  new_writable_count_ = 0;
  non_writable_count_ = 0;
  table_write_callback_ = nullptr;
}
int GetTime() { return clock() / CLOCKS_PER_SEC; };
class WALHandlerTest : public testing::Test {
 public:
  WALHandlerTest()
      : env_(new SpecialEnv(Env::Default())),
        env_options_(EnvOptions()),
        db_options_(DBOptions()),
        table_cache_(NewLRUCache(50000, 16)),
        write_buffer_manager_(db_options_.db_write_buffer_size),
        dbname_(test::PerThreadDBPath("wal_handler_test")),
        last_sequence_(0),
        thread_id_(0){
    DestroyDB(dbname_, Options());
  };

  void Init(bool allow_concurrent = true) {
    ASSERT_OK(env_->CreateDirIfMissing(dbname_));
    ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));

    db_options_.db_paths.emplace_back(dbname_,
                                      std::numeric_limits<uint64_t>::max());
    db_options_.wal_dir = dbname_;
    db_options_.env = env_;
    db_options_.allow_concurrent_memtable_write = allow_concurrent;
    db_options_.info_log.reset(new StdOutLogger(InfoLogLevel::DEBUG_LEVEL));
    ImmutableDBOptions immu_db_options = ImmutableDBOptions(db_options_);

    versions_.reset(new VersionSet(dbname_, &immu_db_options, env_options_,
                                   table_cache_.get(), &write_buffer_manager_,
                                   &write_controller_));

    wal_handler_.reset(new WALHandler(db_options_, versions_.get()));

    column_family_memtables_.reset(
        new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
    wal_handler_->LoadWAL(default_cf_internal_stats_);
    write_batchs_.clear();
  }
  void Put(const std::string& key, const std::string& value) {
    uint64_t seq = ++last_sequence_;
    WriteBatch batch(key.size() + value.size() + 24);
    batch.Put(key, value);

    SequenceNumber curOne;
    wal_handler_->Write(WriteOptions(), &batch, db_, nullptr);
    test_mutex_.lock();
    write_batchs_.push_back(batch.Data());
    test_mutex_.unlock();
  }
  void Example_Put() {
    for (int k = 0; k < 5; ++k) {
      Put(ToString(k), std::string(k + 5, 'v'));
    }
    wal_handler_->Create();
    Put(ToString(99), std::string(64, 'v'));
    Put(ToString(77), "19972334484字符**");
    Put(ToString(33),
        "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»"
        "å…¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ");
    wal_handler_->Create();
    Put("daads", "字符太大，不能用于包围字符字面类型首先字节数组。");
    Put("皮卡丘", "锘挎槬鐪犱笉瑙夋檽锛屽澶勯椈鍟奸笩。");
  }
  Status TEST_Iterate(int& data_sum, std::vector<std::string>& str) {
    return wal_handler_->TEST_Iterate(data_sum, str);
  }
  void TEST_GetLogFilesNumber(std::vector<uint64_t>& log_files_number) {
    std::vector<std::string> all_files;
    env_->GetChildren("/home/hope/data0", &all_files);
    for (const auto& f : all_files) {
      uint64_t number = 0;
      FileType type;
      if (ParseFileName(f, &number, &type) && type == kLogFile) {
        log_files_number.emplace_back(number);
      }
    }
  }

  SpecialEnv* env_;
  EnvOptions env_options_;
  DBOptions db_options_;
  std::unique_ptr<WALHandler> wal_handler_;
  std::shared_ptr<Cache> table_cache_;
  WriteBufferManager write_buffer_manager_;
  WriteController write_controller_;

  std::mutex test_mutex_;
  DB* db_;
  std::string dbname_;
  std::unique_ptr<VersionSet> versions_;
  std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
  mutable InstrumentedMutex mutex_;

  InternalStats* default_cf_internal_stats_;

  std::vector<std::string> write_batchs_;
  std::unique_ptr<log::Writer> current_log_writer_;
  uint64_t last_sequence_;
  int thread_id_;
  struct KeyComparator : public MemTableRep::KeyComparator {
    const InternalKeyComparator comparator;
    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
    virtual int operator()(const char* prefix_len_key1,
                           const char* prefix_len_key2) const override;
    virtual int operator()(const char* prefix_len_key,
                           const DecodedType& key) const override;
    virtual int operator()(const Slice &key1, const Slice &key2,
                           SequenceNumber seq1, SequenceNumber seq2) const override ;
    virtual int operator()(const Slice &key, SequenceNumber seq,
                           const char *prefix_len_key) const override ;
    virtual int operator()(const Slice &key_a, SequenceNumber seq,
                           const Slice &key_b) const override;
  };
};
int WALHandlerTest::KeyComparator::operator()(
    const char* prefix_len_key1, const char* prefix_len_key2) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  return comparator.CompareKeySeq(k1, k2);
}
int WALHandlerTest::KeyComparator::operator()(
    const char* prefix_len_key,
    const MemTableRep::KeyComparator::DecodedType& key) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.CompareKeySeq(a, key);
}
int WALHandlerTest::KeyComparator::operator()(const Slice &key1,
                                              const Slice &key2,
                                              SequenceNumber seq1,
                                              SequenceNumber seq2) const {
  return comparator.CompareKeySeq(key1, key2, seq1, seq2);
}
int WALHandlerTest::KeyComparator::operator()(
    const Slice &key, SequenceNumber seq, const char *prefix_len_key) const {
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.CompareKeySeq(key, seq, a);
}
int WALHandlerTest::KeyComparator::operator()(const Slice &key_a,
                                              SequenceNumber seq,
                                              const Slice &key_b) const {
  return comparator.CompareKeySeq(key_a, seq, key_b);
}

TEST_F(WALHandlerTest, WriteToWALTest) {
  Init();
  for (int k = 0; k < 5; ++k) {
    Put(ToString(k), std::string(k + 5, 'v'));
  };
  int data_size;
  std::vector<std::string> records;
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 5);
  records.clear();
  Put("daads", "字符太大，不能用于包围字符字面类型首先字节数组。");
  Put("皮卡丘", "锘挎槬鐪犱笉瑙夋檽锛屽澶勯椈鍟奸笩。");
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 7);
  int it = 0;
  for (const auto& record : records) {
    ASSERT_EQ(write_batchs_[it++], record);
  };

  Put(ToString(99), std::string(64, 'v'));
  Put(ToString(77), "19972334484字符**");
  Put(ToString(33),
      "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»å…"
      "¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ");
  records.clear();
  it = 0;
  ASSERT_OK(TEST_Iterate(data_size, records));
  for (auto record : records) {
    ASSERT_EQ(write_batchs_[it], record);
    it++;
  };  //
  ASSERT_EQ(data_size, 10);
}

TEST_F(WALHandlerTest, SerialWriteToWALTest) {
  Init(false);
  for (int k = 0; k < 5; ++k) {
    Put(ToString(k), std::string(k + 5, 'v'));
  };
  int data_size;
  std::vector<std::string> records;
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 5);
  records.clear();
  Put("daads", "字符太大，不能用于包围字符字面类型首先字节数组。");
  Put("皮卡丘", "锘挎槬鐪犱笉瑙夋檽锛屽澶勯椈鍟奸笩。");
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 7);
  int it = 0;
  for (const auto& record : records) {
    ASSERT_EQ(write_batchs_[it++], record);
  };

  Put(ToString(99), std::string(64, 'v'));
  Put(ToString(77), "19972334484字符**");
  Put(ToString(33),
      "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»å…"
      "¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ");
  records.clear();
  it = 0;
  ASSERT_OK(TEST_Iterate(data_size, records));
  for (auto record : records) {
    ASSERT_EQ(write_batchs_[it], record);
    it++;
  };  //
  ASSERT_EQ(data_size, 10);
}

TEST_F(WALHandlerTest, MaxWALSizeTest) {
  Init();

  for (int k = 0; k < 1000000; ++k) {
    Put(ToString(k * 100 + 99), std::string(64, 'v'));
    Put(ToString(k * 100 + 77), "19972334484字符**");
    Put(ToString(k * 100 + 33),
        "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»"
        "å…¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ");
  };
  int file_nums = wal_handler_->GetLogfileNumber();
  ASSERT_EQ(file_nums, 6);
}

TEST_F(WALHandlerTest, SerialMaxWALSizeTest) {
  Init(false);

  for (int k = 0; k < 1000000; ++k) {
    Put(ToString(k * 100 + 99), std::string(64, 'v'));
    Put(ToString(k * 100 + 77), "19972334484字符**");
    Put(ToString(k * 100 + 33),
        "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»"
        "å…¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ");
  };
  int file_nums = wal_handler_->GetLogfileNumber();
  ASSERT_EQ(file_nums, 6);
}

std::mutex mu;

void WriteDataTest(class WALHandlerTest* threadarg) {
  // The input parameter is forced to type conversion, from typeless pointer to
  // integer pointer, and then read
  static int threads_id = 0;
  mu.lock();
  threads_id++;
  int thread_id = threads_id;
  mu.unlock();
  WALHandlerTest* w_data;
  w_data = threadarg;
  for (int k = 0; k < 20; ++k) {
    int key = thread_id * 100 + k;
    w_data->Put(ToString(key), std::string(64, 'v'));
  };
}

TEST_F(WALHandlerTest, ConcurrentWriteToWALTest) {
  Init();
  std::thread threads[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; ++i) threads[i] = std::thread(WriteDataTest, this);
  for (auto& thread : threads) thread.join();
  int data_size;
  std::vector<std::string> records;
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 400);
}

TEST_F(WALHandlerTest, SerialConWriteToWALTest) {
  Init(false);
  std::thread threads[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; ++i) threads[i] = std::thread(WriteDataTest, this);
  for (auto& thread : threads) thread.join();
  int data_size;
  std::vector<std::string> records;
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, 400);
}

//TEST_F(WALHandlerTest, CreateWALPerHourTest) {
//  Init();
//  int key_num = 0;
//  std::string key_t;
//  key_t = "key" + NumberToString(key_num);
//  Put(key_t, std::string(4, 'v'));
//
//  std::this_thread::sleep_for(std::chrono::seconds(3600));
//  Put("hello", std::string(4, 'v'));
//  ASSERT_EQ(wal_handler_->GetLogfileNumber(), 2);
//}

TEST_F(WALHandlerTest, RecoverTest) {
  Init();
  Example_Put();
  Init();
  InternalKeyComparator cmp(BytewiseComparator());
  auto factory = std::make_shared<SkipListFactory>();
  Options options;
  options.memtable_factory = factory;
  ImmutableCFOptions ioptions(options);
  WriteBufferManager wb(options.db_write_buffer_size);
  MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                               kMaxSequenceNumber, 0 /* column_family_id */);
  mem->Ref();
  std::string state;
  ColumnFamilyMemTablesDefault cf_mems_default(mem);
  SequenceNumber lastSequence;
  wal_handler_->RecoverLogFilesToTables(&cf_mems_default, lastSequence);
  std::cout << lastSequence << std::endl;
  Arena arena;
  ScopedArenaIterator arena_iter_guard;
  std::unique_ptr<InternalIterator> iter_guard;
  InternalIterator* iter;
  iter = mem->NewIterator(ReadOptions(), &arena);
  arena_iter_guard.set(iter);
  ParsedInternalKey ikey;
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    ikey.clear();
    EXPECT_TRUE(ParseInternalKey(iter->key(), &ikey));
    state.append(ikey.user_key.ToString());
    state.append("@");
    state.append(iter->value().ToString());
  }
  ASSERT_EQ(
      state,
      "0@vvvvv"
      "1@vvvvvv"
      "2@vvvvvvv"
      "3@vvvvvvvv"
      "33@"
      "µ·å»ºå\u0090ˆæŠ•èµ„ç®¡ç\u0090†å»ºä¿¡åŸºé‡‘ç®¡ç\u0090†æœ‰é™\u0090è´£ä»»å…"
      "¬å\u008FMèµ„æœ¬æœ›æ\u00AD£èµ„äº§ç›¸è\u0081šèµ„æœ¬å† é¡¶èµ„æœ"
      "4@vvvvvvvvv"
      "77@19972334484字符**"
      "99@vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"
      "daads@字符太大，不能用于包围字符字面类型首先字节数组。"
      "皮卡丘@锘挎槬鐪犱笉瑙夋檽锛屽澶勯椈鍟奸笩。");
  iter->SeekToLast();
  ParseInternalKey(iter->key(), &ikey);
  SequenceNumber readSeq = ikey.sequence;
  int data_size;
  std::vector<std::string> records;
  ASSERT_OK(TEST_Iterate(data_size, records));
  ASSERT_EQ(data_size, readSeq);
  delete mem->Unref();
}

}  // namespace rocksdb
int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}
